-
Notifications
You must be signed in to change notification settings - Fork 902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make sure all dask-cudf supported aggs are handled in _tree_node_agg
#9487
Make sure all dask-cudf supported aggs are handled in _tree_node_agg
#9487
Conversation
Codecov Report
@@ Coverage Diff @@
## branch-21.12 #9487 +/- ##
================================================
- Coverage 10.79% 10.68% -0.11%
================================================
Files 116 117 +1
Lines 18869 19444 +575
================================================
+ Hits 2036 2078 +42
- Misses 16833 17366 +533
Continue to review full report at Codecov.
|
Overall looks good to me, minor comments.. |
as_index=self.as_index, | ||
)[self._slice] | ||
|
||
def var(self, split_every=None, split_out=1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added in overrides for var
and std
because it seems like trying to use upstream Dask's implementations here fails:
python/dask_cudf/dask_cudf/tests/test_groupby.py ............F
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
aggregation = 'std'
@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS)
def test_groupby_basic_series(aggregation):
pdf = pd.DataFrame(
{
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
)
gdf = cudf.DataFrame.from_pandas(pdf)
ddf = dask_cudf.from_cudf(gdf, npartitions=5)
a = getattr(gdf.groupby("x").x, aggregation)()
> b = getattr(ddf.groupby("x").x, aggregation)().compute()
python/dask_cudf/dask_cudf/tests/test_groupby.py:61:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../dask/dask/dataframe/groupby.py:1445: in std
v = self.var(ddof, split_every=split_every, split_out=split_out)
../dask/dask/dataframe/groupby.py:1439: in var
result = result[self._slice]
../dask/dask/dataframe/core.py:4062: in __getitem__
meta = self._meta[_extract_meta(key)]
../compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/contextlib.py:75: in inner
return func(*args, **kwds)
python/cudf/cudf/core/dataframe.py:1007: in __getitem__
return self._get_columns_by_label(arg, downcast=True)
python/cudf/cudf/core/dataframe.py:1863: in _get_columns_by_label
new_data = super()._get_columns_by_label(labels, downcast)
python/cudf/cudf/core/frame.py:507: in _get_columns_by_label
return self._data.select_by_label(labels)
python/cudf/cudf/core/column_accessor.py:344: in select_by_label
return self._select_by_label_grouped(key)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = ColumnAccessor(multiindex=False, level_names=[None])
y: float64, key = 'x'
def _select_by_label_grouped(self, key: Any) -> ColumnAccessor:
> result = self._grouped_data[key]
E KeyError: 'x'
python/cudf/cudf/core/column_accessor.py:406: KeyError
Instead of adding these functions in individually, is it possible / would it make sense to add them in programmatically based on SUPPORTED_AGGS
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this @charlesbluca ! Just a few questions - Mostly for my own understanding/benefit :)
@pytest.mark.parametrize("aggregation", ["sum", "mean", "count", "min", "max"]) | ||
def test_groupby_basic_aggs(aggregation): | ||
@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) | ||
def test_groupby_basic_frame(aggregation): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand how this is a frame
test now? The test looks the same as test_groupby_basic_series
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference is pretty subtle; in test_groupby_basic_series
we do the aggregation on one of the columns of the frame instead of the frame itself. Happy to make more explicit series groupby tests if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah - I see now :)
It may make sense to combine the two tests and use pytest.mark.parametrize
to cover both cases. However, this is not a blocker (I don't feel very strongly at all).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - I consolidated the tests into one based on a series
parameter 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the (mostly) duplicated test could be simplified a bit, but these changes generally LGTM. Thanks!
@gpucibot merge |
I noticed that for some of dask-cudf's supported aggregations (specifically
first
andlast
), we end up throwing aValueError
in_tree_node_agg
because we do not have a case for them:This PR unifies all references to
_supported
to now reference module levelSUPPORTED_AGGS
, and makes sure all aggs in this variable are handled in_tree_node_agg
. This variable is also imported and used intest_groupby_basic_aggs
, so that we don't need to remember to add an agg to that test manually.Additionally added
CudfDataFrameGroupBy.collect
to get these updated tests passing.